{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": [],
      "toc_visible": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "KQp4e2JqzMtD"
      },
      "outputs":  [{"output_type": "stream", "name": "stdout", "text": ["\n"]}],
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License"
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "#  Embedding Ingestion and Vector Search with Apache Beam and BigQuery\n",
        "<table align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/bigquery_vector_ingestion_and_search.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/bigquery_vector_ingestion_and_search.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "</table>\n"
      ],
      "metadata": {
        "id": "fHkzKF5VzW84"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Introduction\n",
        "\n",
        "This Colab demonstrates how to use the [Apache Beam RAG package](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.html) to generate embeddings, ingest them into BigQuery, and perform [vector similarity search](https://cloud.google.com/bigquery/docs/reference/standard-sql/search_functions#vector_search).\n",
        "\n",
        "The notebook is divided into two main parts:\n",
        "1. **Basic Example**: Using the default schema for simple vector search\n",
        "2. **Advanced Example**: Using a custom schema and metadata filtering\n",
        "\n",
        "## Example: Product Catalog\n",
        "\n",
        "We'll work with a sample e-commerce dataset representing a product catalog. Each product has:\n",
        "\n",
        "*   **Structured fields:** `id`, `name`, `category`, `price`, etc.\n",
        "*   **Detailed text descriptions:** Longer text describing the product's features.\n",
        "*   **Additional metadata:** `brand`, `features`, `dimensions`, etc.\n",
        "\n",
        "# Setup and Prerequisites\n",
        "\n",
        "This example requires:\n",
        "1. A Google Cloud project with BigQuery enabled\n",
        "2. Apache Beam 2.64.0 or later\n",
        "\n",
        "## Install Packages and Dependencies\n",
        "\n",
        "First, let's install the Python packages required for the embedding and ingestion pipeline:"
      ],
      "metadata": {
        "id": "c6o1dCiU0DTO"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Apache Beam with GCP support\n",
        "!pip install apache_beam[interactive,gcp]>=2.64.0 --quiet\n",
        "# Huggingface sentence-transformers for embedding models\n",
        "!pip install sentence-transformers --quiet\n"
      ],
      "metadata": {
        "id": "YD_0mocy1SWU"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Authenticate to Google Cloud\n",
        "\n",
        "To connect to BigQuery, we authenticate with Google Cloud."
      ],
      "metadata": {
        "id": "0CrDVBz80WPN"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "PROJECT_ID = \"\"  # @param {type:\"string\"}\n",
        "\n",
        "# Authentication and project setup\n",
        "from google.colab import auth\n",
        "auth.authenticate_user(project_id=PROJECT_ID)"
      ],
      "metadata": {
        "id": "XkifVvXf1QDL"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Create BigQuery Dataset\n",
        "\n",
        "Let's set up a BigQuery dataset and table to store our embeddings:"
      ],
      "metadata": {
        "id": "fceXm66K3vvr"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "DATASET_ID = \"\" # @param {type:\"string\"}\n",
        "TEMP_GCS_LOCATION = \"gs://\" # @param {type:\"string\"}"
      ],
      "metadata": {
        "id": "yKV9xL_83y6M"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "code",
      "source": [
        "from google.cloud import bigquery\n",
        "\n",
        "# Create BigQuery client\n",
        "client = bigquery.Client(project=PROJECT_ID)\n",
        "\n",
        "# Create dataset\n",
        "dataset_ref = client.dataset(DATASET_ID)\n",
        "try:\n",
        "    client.get_dataset(dataset_ref)\n",
        "    print(f\"Dataset {DATASET_ID} already exists\")\n",
        "except Exception:\n",
        "    dataset = bigquery.Dataset(dataset_ref)\n",
        "    dataset.location = \"US\"\n",
        "    dataset = client.create_dataset(dataset)\n",
        "    print(f\"Created dataset {DATASET_ID}\")"
      ],
      "metadata": {
        "id": "PVFMFHs037FD"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Importing Pipeline Components\n",
        "\n",
        "We import the following for configuring our embedding ingestion pipeline:\n",
        "- `Chunk`, the structured that represents embeddable content with metadata\n",
        "- `BigQueryVectorWriterConfig` for configuring write behavior"
      ],
      "metadata": {
        "id": "8GWz3xOy4ZKg"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Embedding-specific imports\n",
        "from apache_beam.ml.rag.ingestion.bigquery import BigQueryVectorWriterConfig, SchemaConfig\n",
        "from apache_beam.ml.rag.ingestion.base import VectorDatabaseWriteTransform\n",
        "from apache_beam.ml.rag.types import Chunk, Content\n",
        "from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings\n",
        "from apache_beam.ml.rag.enrichment.bigquery_vector_search import (\n",
        "    BigQueryVectorSearchParameters,\n",
        "    BigQueryVectorSearchEnrichmentHandler\n",
        ")\n",
        "\n",
        "# Apache Beam core\n",
        "import apache_beam as beam\n",
        "from apache_beam.options import pipeline_options\n",
        "from apache_beam.options.pipeline_options import PipelineOptions\n",
        "from apache_beam.ml.transforms.base import MLTransform\n",
        "from apache_beam.transforms.enrichment import Enrichment"
      ],
      "metadata": {
        "id": "dlr2epJ44YkB"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Define helper functions\n",
        "\n",
        "To run streaming examples we define helper functions to\n",
        "- Create a PubSub topic\n",
        "- Publish messages to a PubSub topic in a background thread"
      ],
      "metadata": {
        "id": "rU0yWV6OFV3A"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Set up PubSub topic\n",
        "from google.api_core.exceptions import AlreadyExists\n",
        "from google.cloud import pubsub_v1\n",
        "import threading\n",
        "import time\n",
        "import datetime\n",
        "import json\n",
        "\n",
        "def create_pubsub_topic(project_id, topic):\n",
        "  publisher = pubsub_v1.PublisherClient()\n",
        "  topic_path = publisher.topic_path(project_id, topic)\n",
        "  try:\n",
        "      topic = publisher.create_topic(request={\"name\": topic_path})\n",
        "      print(f\"Created topic: {topic.name}\")\n",
        "  except AlreadyExists:\n",
        "      print(f\"Topic {topic_path} already exists.\")\n",
        "  return topic_path\n",
        "\n",
        "def publisher_function(project_id, topic, sample_data):\n",
        "  \"\"\"Function that publishes sample queries to a PubSub topic.\n",
        "\n",
        "  This function runs in a separate thread and continuously publishes\n",
        "  messages to simulate real-time user queries.\n",
        "  \"\"\"\n",
        "  publisher = pubsub_v1.PublisherClient()\n",
        "  topic_path = publisher.topic_path(project_id, topic)\n",
        "  time.sleep(15)\n",
        "  for message in sample_data:\n",
        "\n",
        "      # Convert to JSON and publish\n",
        "      data = json.dumps(message).encode('utf-8')\n",
        "      try:\n",
        "          publisher.publish(topic_path, data)\n",
        "      except Exception:\n",
        "          pass  # Silently continue on error\n",
        "\n",
        "      # Wait 7 seconds before next message\n",
        "      time.sleep(7)"
      ],
      "metadata": {
        "id": "O-XQypQ4FnZ1"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Quick start: Embedding Generation and Ingestion with Default Schema\n",
        "\n",
        "## Create Sample Product Catalog Data\n",
        "\n",
        "First, we create a sample product catalog with descriptions to be embedded"
      ],
      "metadata": {
        "id": "_aeyG1MU4d8B"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "#@title Create product catalog data with rich descriptions for semantic search\n",
        "\n",
        "PRODUCTS_DATA = [\n",
        "    {\n",
        "        \"id\": \"laptop-001\",\n",
        "        \"name\": \"UltraBook Pro X15\",\n",
        "        \"description\": \"Powerful ultralight laptop featuring a 15-inch 4K OLED display, 12th Gen Intel i9 processor, 32GB RAM, and 1TB SSD. Perfect for creative professionals, developers, and power users who need exceptional performance in a slim form factor. Includes Thunderbolt 4 ports, all-day battery life, and advanced cooling system.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Laptops\",\n",
        "        \"price\": 1899.99,\n",
        "        \"brand\": \"TechMaster\",\n",
        "        \"features\": [\"4K OLED Display\", \"Intel i9\", \"32GB RAM\", \"1TB SSD\", \"Thunderbolt 4\"],\n",
        "        \"weight\": \"3.5 lbs\",\n",
        "        \"dimensions\": \"14.1 x 9.3 x 0.6 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"phone-001\",\n",
        "        \"name\": \"Galaxy Ultra S23\",\n",
        "        \"description\": \"Flagship smartphone with a stunning 6.8-inch Dynamic AMOLED display, 200MP camera system, and 5nm processor. Features 8K video recording, 5G connectivity, and all-day battery life. Water and dust resistant with IP68 rating. Perfect for photography enthusiasts, mobile gamers, and professionals who need reliable performance.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Smartphones\",\n",
        "        \"price\": 1199.99,\n",
        "        \"brand\": \"Samsung\",\n",
        "        \"features\": [\"200MP Camera\", \"6.8-inch AMOLED\", \"5G\", \"IP68 Water Resistant\", \"8K Video\"],\n",
        "        \"weight\": \"8.2 oz\",\n",
        "        \"dimensions\": \"6.4 x 3.1 x 0.35 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"headphones-001\",\n",
        "        \"name\": \"SoundSphere Pro\",\n",
        "        \"description\": \"Premium wireless noise-cancelling headphones with spatial audio technology and adaptive EQ. Features 40 hours of battery life, memory foam ear cushions, and voice assistant integration. Seamlessly switch between devices with multi-point Bluetooth connectivity. Ideal for audiophiles, frequent travelers, and professionals working in noisy environments.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Audio\",\n",
        "        \"price\": 349.99,\n",
        "        \"brand\": \"AudioTech\",\n",
        "        \"features\": [\"Active Noise Cancellation\", \"Spatial Audio\", \"40hr Battery\", \"Bluetooth 5.2\", \"Voice Assistant\"],\n",
        "        \"weight\": \"9.8 oz\",\n",
        "        \"dimensions\": \"7.5 x 6.8 x 3.2 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"coffee-001\",\n",
        "        \"name\": \"BrewMaster 5000\",\n",
        "        \"description\": \"Smart coffee maker with precision temperature control, customizable brewing profiles, and app connectivity. Schedule brewing times, adjust strength, and receive maintenance alerts from your smartphone. Features a built-in grinder, 12-cup capacity, and thermal carafe to keep coffee hot for hours. Perfect for coffee enthusiasts and busy professionals.\",\n",
        "        \"category\": \"Home & Kitchen\",\n",
        "        \"subcategory\": \"Appliances\",\n",
        "        \"price\": 199.99,\n",
        "        \"brand\": \"HomeBarista\",\n",
        "        \"features\": [\"Smart App Control\", \"Built-in Grinder\", \"Thermal Carafe\", \"Customizable Brewing\", \"12-cup Capacity\"],\n",
        "        \"weight\": \"12.5 lbs\",\n",
        "        \"dimensions\": \"10.5 x 8.2 x 14.3 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"chair-001\",\n",
        "        \"name\": \"ErgoFlex Executive Chair\",\n",
        "        \"description\": \"Ergonomic office chair with dynamic lumbar support, adjustable armrests, and breathable mesh back. Features 5-point adjustability, premium cushioning, and smooth-rolling casters. Designed to reduce back pain and improve posture during long work sessions. Ideal for home offices, professionals, and anyone who sits for extended periods.\",\n",
        "        \"category\": \"Furniture\",\n",
        "        \"subcategory\": \"Office Furniture\",\n",
        "        \"price\": 329.99,\n",
        "        \"brand\": \"ComfortDesign\",\n",
        "        \"features\": [\"Lumbar Support\", \"Adjustable Armrests\", \"Mesh Back\", \"5-point Adjustment\", \"Premium Cushioning\"],\n",
        "        \"weight\": \"45 lbs\",\n",
        "        \"dimensions\": \"28 x 25 x 45 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"sneakers-001\",\n",
        "        \"name\": \"CloudStep Running Shoes\",\n",
        "        \"description\": \"Lightweight performance running shoes with responsive cushioning, breathable knit upper, and carbon fiber plate for energy return. Features adaptive arch support, reflective elements for visibility, and durable rubber outsole. Designed for marathon runners, daily joggers, and fitness enthusiasts seeking comfort and performance.\",\n",
        "        \"category\": \"Apparel\",\n",
        "        \"subcategory\": \"Footwear\",\n",
        "        \"price\": 159.99,\n",
        "        \"brand\": \"AthleteElite\",\n",
        "        \"features\": [\"Responsive Cushioning\", \"Carbon Fiber Plate\", \"Breathable Knit\", \"Adaptive Support\", \"Reflective Elements\"],\n",
        "        \"weight\": \"8.7 oz\",\n",
        "        \"dimensions\": \"12 x 4.5 x 5 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"blender-001\",\n",
        "        \"name\": \"NutriBlend Pro\",\n",
        "        \"description\": \"High-performance blender with 1200W motor, variable speed control, and pre-programmed settings for smoothies, soups, and nut butters. Features stainless steel blades, 64oz BPA-free container, and pulse function. Includes personal blending cups for on-the-go nutrition. Perfect for health-conscious individuals, busy families, and culinary enthusiasts.\",\n",
        "        \"category\": \"Home & Kitchen\",\n",
        "        \"subcategory\": \"Appliances\",\n",
        "        \"price\": 149.99,\n",
        "        \"brand\": \"KitchenPro\",\n",
        "        \"features\": [\"1200W Motor\", \"Variable Speed\", \"Pre-programmed Settings\", \"64oz Container\", \"Personal Cups\"],\n",
        "        \"weight\": \"11.8 lbs\",\n",
        "        \"dimensions\": \"8.5 x 9.5 x 17.5 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"camera-001\",\n",
        "        \"name\": \"ProShot X7 Mirrorless Camera\",\n",
        "        \"description\": \"Professional mirrorless camera with 45MP full-frame sensor, 8K video recording, and advanced autofocus system with subject recognition. Features in-body stabilization, weather sealing, and dual card slots. Includes a versatile 24-105mm lens. Ideal for professional photographers, videographers, and serious enthusiasts seeking exceptional image quality.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Cameras\",\n",
        "        \"price\": 2499.99,\n",
        "        \"brand\": \"OptiView\",\n",
        "        \"features\": [\"45MP Sensor\", \"8K Video\", \"Advanced Autofocus\", \"In-body Stabilization\", \"Weather Sealed\"],\n",
        "        \"weight\": \"1.6 lbs (body only)\",\n",
        "        \"dimensions\": \"5.4 x 3.8 x 3.2 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"watch-001\",\n",
        "        \"name\": \"FitTrack Ultra Smartwatch\",\n",
        "        \"description\": \"Advanced fitness smartwatch with continuous health monitoring, GPS tracking, and 25-day battery life. Features ECG, blood oxygen monitoring, sleep analysis, and 30+ sport modes. Water-resistant to 50m with a durable sapphire crystal display. Perfect for athletes, fitness enthusiasts, and health-conscious individuals tracking wellness metrics.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Wearables\",\n",
        "        \"price\": 299.99,\n",
        "        \"brand\": \"FitTech\",\n",
        "        \"features\": [\"ECG Monitor\", \"GPS Tracking\", \"25-day Battery\", \"Blood Oxygen\", \"30+ Sport Modes\"],\n",
        "        \"weight\": \"1.6 oz\",\n",
        "        \"dimensions\": \"1.7 x 1.7 x 0.5 inches\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"backpack-001\",\n",
        "        \"name\": \"Voyager Pro Travel Backpack\",\n",
        "        \"description\": \"Premium travel backpack with anti-theft features, expandable capacity, and dedicated laptop compartment. Features water-resistant materials, hidden pockets, and ergonomic design with padded straps. Includes USB charging port and luggage pass-through. Ideal for business travelers, digital nomads, and adventure seekers needing secure, organized storage.\",\n",
        "        \"category\": \"Travel\",\n",
        "        \"subcategory\": \"Luggage\",\n",
        "        \"price\": 129.99,\n",
        "        \"brand\": \"TrekGear\",\n",
        "        \"features\": [\"Anti-theft Design\", \"Expandable\", \"Laptop Compartment\", \"Water Resistant\", \"USB Charging Port\"],\n",
        "        \"weight\": \"2.8 lbs\",\n",
        "        \"dimensions\": \"20 x 12 x 8 inches\"\n",
        "    }\n",
        "]\n",
        "\n",
        "print(f\"Created product catalog with {len(PRODUCTS_DATA)} products\")"
      ],
      "metadata": {
        "cellView": "form",
        "id": "YFvFenMN4I6n"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Create BigQuery Table"
      ],
      "metadata": {
        "id": "91X8wuQGAjJf"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from google.cloud import bigquery\n",
        "\n",
        "# Create BigQuery client\n",
        "client = bigquery.Client(project=PROJECT_ID)\n",
        "\n",
        "DEFAULT_TABLE_ID = f\"{PROJECT_ID}.{DATASET_ID}.default_product_embeddings\"\n",
        "default_schema = [\n",
        "    bigquery.SchemaField(\"id\", \"STRING\", mode=\"REQUIRED\"),\n",
        "    bigquery.SchemaField(\"embedding\", \"FLOAT64\", mode=\"REPEATED\"),\n",
        "    bigquery.SchemaField(\"content\", \"STRING\"),\n",
        "    bigquery.SchemaField(\"metadata\", \"RECORD\", mode=\"REPEATED\", fields=[\n",
        "        bigquery.SchemaField(\"key\", \"STRING\"),\n",
        "        bigquery.SchemaField(\"value\", \"STRING\")\n",
        "    ])\n",
        "]\n",
        "\n",
        "default_table = bigquery.Table(DEFAULT_TABLE_ID, schema=default_schema)\n",
        "try:\n",
        "    client.get_table(default_table)\n",
        "    print(f\"Table {DEFAULT_TABLE_ID} already exists\")\n",
        "except Exception:\n",
        "    default_table = client.create_table(default_table)\n",
        "    print(f\"Created table {DEFAULT_TABLE_ID}\")"
      ],
      "metadata": {
        "id": "DmqSEYwgAvde"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Define Pipeline components\n",
        "\n",
        "Next, we define pipeline components that\n",
        "1. Convert product data to `Chunk` type\n",
        "2. Generate Embeddings using a pre-trained model\n",
        "3. Write to BigQuery\n",
        "\n",
        "### Map products to Chunks\n",
        "\n",
        "We define a function convert each ingested product dictionary to a `Chunk` to configure what text to embed and what to treat as metadata."
      ],
      "metadata": {
        "id": "ltHo5P6p8YGj"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from typing import Dict, Any\n",
        "\n",
        "# The create_chunk function converts our product dictionaries to Chunks.\n",
        "# This doesn't split the text - it simply structures it in the format\n",
        "# expected by the embedding pipeline components.\n",
        "def create_chunk(product: Dict[str, Any]) -> Chunk:\n",
        "    \"\"\"Convert a product dictionary into a Chunk object.\n",
        "\n",
        "       The pipeline components (MLTransform, VectorDatabaseWriteTransform)\n",
        "       work with Chunk objects. This function:\n",
        "       1. Extracts text we want to embed\n",
        "       2. Preserves product data as metadata\n",
        "       3. Creates a Chunk in the expected format\n",
        "\n",
        "    Args:\n",
        "        product: Dictionary containing product information\n",
        "\n",
        "    Returns:\n",
        "        Chunk: A Chunk object ready for embedding\n",
        "    \"\"\"\n",
        "    # Combine name and description for embedding\n",
        "    text_to_embed = f\"{product['name']}: {product['description']}\"\n",
        "\n",
        "    return Chunk(\n",
        "        content=Content(text=text_to_embed),  # The text that will be embedded\n",
        "        id=product['id'],                     # Use product ID as chunk ID\n",
        "        metadata=product,                     # Store all product info in metadata\n",
        "    )"
      ],
      "metadata": {
        "id": "Ki78caDA8CDF"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Generate embeddings with HuggingFace\n",
        "\n",
        "We configure a local pre-trained Hugging Face model to create vector embeddings from the product descriptions."
      ],
      "metadata": {
        "id": "g8ER6InF8_7b"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure the embedding model\n",
        "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
        "    model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
        ")"
      ],
      "metadata": {
        "id": "dtTRSH4c9FUb"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Write to BigQuery\n",
        "\n",
        "The default BigQueryVectorWriterConfig maps Chunk fields to database columns as:\n",
        "\n",
        "| Database Column | Chunk Field | Description |\n",
        "|----------------|-------------|-------------|\n",
        "| id             | chunk.id    | Unique identifier |\n",
        "| embedding      | chunk.embedding.dense_embedding | Vector representation |\n",
        "| content        | chunk.content.text | Text that was embedded |\n",
        "| metadata       | chunk.metadata | Additional data as RECORD |"
      ],
      "metadata": {
        "id": "l_-sWRbv9HBF"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure BigQuery writer with default schema\n",
        "bigquery_writer_config = BigQueryVectorWriterConfig(\n",
        "    write_config={\n",
        "        'table': DEFAULT_TABLE_ID,\n",
        "        'create_disposition': 'CREATE_IF_NEEDED',\n",
        "        'write_disposition': 'WRITE_TRUNCATE'  # Overwrite existing data\n",
        "    }\n",
        ")"
      ],
      "metadata": {
        "id": "mw1pWWBJ9Lll"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Assemble and Run Pipeline\n",
        "\n",
        "Now we can create our pipeline that:\n",
        "1. Ingests our product data\n",
        "2. Converts each product to a Chunk\n",
        "3. Generates embeddings for each Chunk\n",
        "4. Stores everything in BigQuery"
      ],
      "metadata": {
        "id": "fn2K0Uwd9a13"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "import tempfile\n",
        "\n",
        "options = pipeline_options.PipelineOptions([f\"--temp_location={TEMP_GCS_LOCATION}\"])\n",
        "\n",
        "# Run batch pipeline\n",
        "with beam.Pipeline(options=options) as p:\n",
        "    _ = (\n",
        "        p\n",
        "        | 'Create Products' >> beam.Create(PRODUCTS_DATA)\n",
        "        | 'Convert to Chunks' >> beam.Map(create_chunk)\n",
        "        | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
        "          .with_transform(huggingface_embedder)\n",
        "        | 'Write to BigQuery' >> VectorDatabaseWriteTransform(bigquery_writer_config)\n",
        "    )"
      ],
      "metadata": {
        "id": "x1gC5Qsx9fNp"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Verify Embeddings\n",
        "\n",
        "Let's check what was written to our BigQuery table:"
      ],
      "metadata": {
        "id": "cJ5a1j7F9h4n"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Query to verify the embeddings\n",
        "query = f\"\"\"\n",
        "SELECT\n",
        "  id,\n",
        "  ARRAY_LENGTH(embedding) as embedding_dimensions,\n",
        "  content,\n",
        "  (SELECT COUNT(*) FROM UNNEST(metadata)) as metadata_count\n",
        "FROM\n",
        "  `{DEFAULT_TABLE_ID}`\n",
        "LIMIT 5\n",
        "\"\"\"\n",
        "\n",
        "# Run the query\n",
        "query_job = client.query(query)\n",
        "results = query_job.result()\n",
        "\n",
        "# Display results\n",
        "for row in results:\n",
        "    print(f\"Product ID: {row.id}\")\n",
        "    print(f\"Embedding Dimensions: {row.embedding_dimensions}\")\n",
        "    print(f\"Content: {row.content[:100]}...\")  # Show first 100 chars\n",
        "    print(f\"Metadata Count: {row.metadata_count}\")\n",
        "    print(\"-\" * 80)"
      ],
      "metadata": {
        "id": "35_DnMzZ9kCZ"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Quick start: Vector Search\n",
        "\n",
        "Prerequisites:\n",
        "- Quick start: Basic Vector Generation and Ingestion\n",
        "\n",
        "\n",
        "In this section we create a streaming pipeline that\n",
        "- Reads queries from PubSub\n",
        "- Embeds the queries\n",
        "- Performs Vector Search on the ingested product catalog data\n",
        "- Logs the queries enriched with product catalog data\n",
        "\n",
        "## Define Sample Queries\n"
      ],
      "metadata": {
        "id": "NYmojGx_43WQ"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "SAMPLE_QUERIES = [\n",
        "    {\"query\": \"I need a powerful laptop for video editing and programming\"},\n",
        "    {\"query\": \"Looking for noise-cancelling headphones for travel\"},\n",
        "    {\"query\": \"What's a good ergonomic office chair for long work hours?\"},\n",
        "    {\"query\": \"I want a waterproof portable speaker for the beach\"},\n",
        "    {\"query\": \"Need a professional camera for wildlife photography\"}\n",
        "]"
      ],
      "metadata": {
        "id": "GbOxY-7EDg0W"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Setup PubSub Steaming Source\n",
        "We create a PubSub topic for our pipeline's data source."
      ],
      "metadata": {
        "id": "BDTPnu36EWzJ"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Create pubsub topic\n",
        "TOPIC = \"\" # @param {type:'string'}\n",
        "\n",
        "topic_path = create_pubsub_topic(PROJECT_ID, TOPIC)\n"
      ],
      "metadata": {
        "id": "bn6exZ7RDKAm"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Define pipeline components\n",
        "Next, we define pipeline components.\n",
        "### Process PubSub messages\n"
      ],
      "metadata": {
        "id": "0E6DkxG6EhwE"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "def process_query(message):\n",
        "    \"\"\"Convert a pubsub message to a Chunk for embedding and search.\"\"\"\n",
        "    message_data = json.loads(message.decode('utf-8'))\n",
        "    return Chunk(\n",
        "        content=Content(text=message_data['query']),\n",
        "        metadata={\"query_type\": \"product_search\"}\n",
        "    )"
      ],
      "metadata": {
        "id": "OzWxUujQGEnE"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Configure embedding model\n"
      ],
      "metadata": {
        "id": "HkHew4dRGHEM"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure the embedding model\n",
        "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
        "    model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
        ")"
      ],
      "metadata": {
        "id": "uBXMzSJ_GPaO"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Configure vector search\n"
      ],
      "metadata": {
        "id": "N_9F6bDqGSeb"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure vector search parameters (no filters)\n",
        "vector_search_params = BigQueryVectorSearchParameters(\n",
        "    project=PROJECT_ID,\n",
        "    table_name=DEFAULT_TABLE_ID,\n",
        "    embedding_column=\"embedding\",\n",
        "    columns=[\"content\", \"metadata\"],\n",
        "    neighbor_count=1  # Return top match\n",
        ")\n",
        "\n",
        "# Create search handler\n",
        "search_handler = BigQueryVectorSearchEnrichmentHandler(\n",
        "    vector_search_parameters=vector_search_params,\n",
        "    min_batch_size=1,\n",
        "    max_batch_size=5\n",
        ")"
      ],
      "metadata": {
        "id": "CFdqVCmUGa8-"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Log the enriched query"
      ],
      "metadata": {
        "id": "5BQaambeGXd8"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "def log_results(chunk):\n",
        "    \"\"\"Format search results for display.\"\"\"\n",
        "    # Extract results from enrichment_data\n",
        "    results = chunk.metadata.get(\"enrichment_data\", {}).get(\"chunks\", [])\n",
        "\n",
        "    # Log the query\n",
        "    print(f\"\\n=== QUERY: \\\"{chunk.content.text}\\\" ===\")\n",
        "\n",
        "    # Log the results\n",
        "    print(f\"Found {len(results)} matching products:\")\n",
        "\n",
        "    if results:\n",
        "        for i, result in enumerate(results, 1):\n",
        "            # Convert metadata array to dictionary\n",
        "            product_metadata = {}\n",
        "            if \"metadata\" in result:\n",
        "                for item in result.get(\"metadata\", []):\n",
        "                    product_metadata[item[\"key\"]] = item[\"value\"]\n",
        "\n",
        "            # Print product details\n",
        "            print(f\"\\nResult {i}:\")\n",
        "            print(f\"  Product: {product_metadata.get('name', 'Unknown')}\")\n",
        "            print(f\"  Brand: {product_metadata.get('brand', 'Unknown')}\")\n",
        "            print(f\"  Category: {product_metadata.get('category', 'Unknown')} > {product_metadata.get('subcategory', 'Unknown')}\")\n",
        "            print(f\"  Price: ${product_metadata.get('price', 'Unknown')}\")\n",
        "            print(f\"  Description: {product_metadata.get('description', 'Unknown')[:100]}...\")\n",
        "    else:\n",
        "        print(\"  No matching products found.\")\n",
        "\n",
        "    print(\"=\" * 80)\n",
        "\n",
        "    return chunk"
      ],
      "metadata": {
        "id": "YqD4-1I2GLEz"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Run the Basic Search Pipeline\n",
        "\n",
        "Now we'll start publishing messages to PubSub in the background, and run our pipeline to:\n",
        "1. Process the sample queries\n",
        "2. Generate embeddings for each query\n",
        "3. Perform vector search in BigQuery\n",
        "4. Format and display the results\n",
        "\n",
        "Note: This pipeline will keep running until the execution is interrupted."
      ],
      "metadata": {
        "id": "17NfPU07CccH"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "print(\"Starting publisher thread...\")\n",
        "publisher_thread = threading.Thread(\n",
        "    target=publisher_function,\n",
        "    args=(PROJECT_ID, TOPIC, SAMPLE_QUERIES),\n",
        "    daemon=True\n",
        ")\n",
        "publisher_thread.start()\n",
        "print(f\"Publisher thread started with ID: {publisher_thread.ident}\")\n",
        "\n",
        "import tempfile\n",
        "from apache_beam.transforms import trigger\n",
        "\n",
        "options = pipeline_options.PipelineOptions()\n",
        "options.view_as(pipeline_options.StandardOptions).streaming = True\n",
        "# Run the streaming pipeline\n",
        "print(f\"Running pipeline...\")\n",
        "with beam.Pipeline(options=options) as p:\n",
        "    results = (\n",
        "        p\n",
        "        | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=topic_path)\n",
        "        | 'Process Messages' >> beam.Map(process_query)\n",
        "        | 'Window' >> beam.WindowInto(beam.window.GlobalWindows(),\n",
        "                             trigger=trigger.Repeatedly(\n",
        "                                trigger.AfterProcessingTime(\n",
        "                                    1)),\n",
        "                             accumulation_mode=trigger.AccumulationMode\\\n",
        "                                 .DISCARDING)\n",
        "        | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
        "          .with_transform(huggingface_embedder)\n",
        "        | 'Vector Search' >> Enrichment(search_handler)\n",
        "        | 'Log Results' >> beam.Map(log_results)\n",
        "    )"
      ],
      "metadata": {
        "id": "bK2KqLPSCady"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Advanced: Embedding Generation and Ingestion with Custom Schema\n",
        "\n",
        "In this part, we create pipelines to\n",
        "- Write embeddings to a BigQuery table with a custom schema\n",
        "- Perform Vector Search with metadata filters.\n",
        "\n"
      ],
      "metadata": {
        "id": "IQv4OsYFNDYW"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Create Product Dataset with Multiple Items per Category\n",
        "\n",
        "Let's create a more focused product dataset with multiple items in each category to better demonstrate filtering:"
      ],
      "metadata": {
        "id": "oEtvW2kP_8C2"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "FILTERED_PRODUCTS_DATA = [\n",
        "    # Electronics - Laptops (3 items with different price points)\n",
        "    {\n",
        "        \"id\": \"laptop-001\",\n",
        "        \"name\": \"UltraBook Pro X15\",\n",
        "        \"description\": \"Powerful ultralight laptop featuring a 15-inch 4K OLED display, 12th Gen Intel i9 processor, 32GB RAM, and 1TB SSD. Perfect for creative professionals, developers, and power users who need exceptional performance in a slim form factor.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Laptops\",\n",
        "        \"price\": 1899.99,\n",
        "        \"brand\": \"TechMaster\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"laptop-002\",\n",
        "        \"name\": \"UltraBook Air 13\",\n",
        "        \"description\": \"Thin and light laptop with 13-inch Retina display, M2 chip, 16GB RAM, and 512GB SSD. Ideal for students, travelers, and professionals who need portability without sacrificing performance.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Laptops\",\n",
        "        \"price\": 1299.99,\n",
        "        \"brand\": \"TechMaster\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"laptop-003\",\n",
        "        \"name\": \"PowerBook Gaming Pro\",\n",
        "        \"description\": \"High-performance gaming laptop with 17-inch 144Hz display, RTX 3080 graphics, Intel i7 processor, 32GB RAM, and 1TB SSD. Designed for serious gamers and content creators who need desktop-class performance in a portable package.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Laptops\",\n",
        "        \"price\": 2199.99,\n",
        "        \"brand\": \"GameTech\"\n",
        "    },\n",
        "\n",
        "    # Electronics - Headphones (3 items with different price points)\n",
        "    {\n",
        "        \"id\": \"headphones-001\",\n",
        "        \"name\": \"SoundSphere Pro\",\n",
        "        \"description\": \"Premium wireless noise-cancelling headphones with spatial audio technology and adaptive EQ. Features 40 hours of battery life, memory foam ear cushions, and voice assistant integration.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Headphones\",\n",
        "        \"price\": 349.99,\n",
        "        \"brand\": \"AudioTech\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"headphones-002\",\n",
        "        \"name\": \"SoundSphere Sport\",\n",
        "        \"description\": \"Wireless sport earbuds with sweat and water resistance, secure fit, and 8-hour battery life. Perfect for workouts, running, and active lifestyles.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Headphones\",\n",
        "        \"price\": 129.99,\n",
        "        \"brand\": \"AudioTech\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"headphones-003\",\n",
        "        \"name\": \"BassBoost Studio\",\n",
        "        \"description\": \"Professional studio headphones with high-fidelity sound, premium materials, and exceptional comfort for long sessions. Designed for audio engineers, musicians, and audiophiles.\",\n",
        "        \"category\": \"Electronics\",\n",
        "        \"subcategory\": \"Headphones\",\n",
        "        \"price\": 249.99,\n",
        "        \"brand\": \"SoundPro\"\n",
        "    },\n",
        "\n",
        "    # Home & Kitchen - Coffee Makers (3 items with different price points)\n",
        "    {\n",
        "        \"id\": \"coffee-001\",\n",
        "        \"name\": \"BrewMaster 5000\",\n",
        "        \"description\": \"Smart coffee maker with precision temperature control, customizable brewing profiles, and app connectivity. Schedule brewing times, adjust strength, and receive maintenance alerts from your smartphone.\",\n",
        "        \"category\": \"Home & Kitchen\",\n",
        "        \"subcategory\": \"Coffee Makers\",\n",
        "        \"price\": 199.99,\n",
        "        \"brand\": \"HomeBarista\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"coffee-002\",\n",
        "        \"name\": \"BrewMaster Espresso\",\n",
        "        \"description\": \"Semi-automatic espresso machine with 15-bar pressure pump, milk frother, and programmable settings. Make cafe-quality espresso, cappuccino, and latte at home.\",\n",
        "        \"category\": \"Home & Kitchen\",\n",
        "        \"subcategory\": \"Coffee Makers\",\n",
        "        \"price\": 299.99,\n",
        "        \"brand\": \"HomeBarista\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"coffee-003\",\n",
        "        \"name\": \"BrewMaster Basic\",\n",
        "        \"description\": \"Simple, reliable drip coffee maker with 12-cup capacity, programmable timer, and auto-shutoff. Perfect for everyday coffee drinkers who want convenience and consistency.\",\n",
        "        \"category\": \"Home & Kitchen\",\n",
        "        \"subcategory\": \"Coffee Makers\",\n",
        "        \"price\": 49.99,\n",
        "        \"brand\": \"HomeBarista\"\n",
        "    },\n",
        "\n",
        "    # Furniture - Office Chairs (3 items with different price points)\n",
        "    {\n",
        "        \"id\": \"chair-001\",\n",
        "        \"name\": \"ErgoFlex Executive Chair\",\n",
        "        \"description\": \"Ergonomic office chair with dynamic lumbar support, adjustable armrests, and breathable mesh back. Features 5-point adjustability, premium cushioning, and smooth-rolling casters.\",\n",
        "        \"category\": \"Furniture\",\n",
        "        \"subcategory\": \"Office Chairs\",\n",
        "        \"price\": 329.99,\n",
        "        \"brand\": \"ComfortDesign\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"chair-002\",\n",
        "        \"name\": \"ErgoFlex Task Chair\",\n",
        "        \"description\": \"Mid-range ergonomic task chair with fixed lumbar support, height-adjustable armrests, and mesh back. Perfect for home offices and everyday use.\",\n",
        "        \"category\": \"Furniture\",\n",
        "        \"subcategory\": \"Office Chairs\",\n",
        "        \"price\": 179.99,\n",
        "        \"brand\": \"ComfortDesign\"\n",
        "    },\n",
        "    {\n",
        "        \"id\": \"chair-003\",\n",
        "        \"name\": \"ErgoFlex Budget Chair\",\n",
        "        \"description\": \"Affordable office chair with basic ergonomic features, armrests, and fabric upholstery. A practical choice for occasional use or budget-conscious shoppers.\",\n",
        "        \"category\": \"Furniture\",\n",
        "        \"subcategory\": \"Office Chairs\",\n",
        "        \"price\": 89.99,\n",
        "        \"brand\": \"ComfortDesign\"\n",
        "    }\n",
        "]"
      ],
      "metadata": {
        "id": "a0s9COrbNsHJ"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Create BigQuery Table with Custom Schema\n",
        "\n",
        "Now, let's create a BigQuery table with a custom schema that unnests metadata fields:"
      ],
      "metadata": {
        "id": "0wVVI7kjN562"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Create table with custom schema for vector embeddings\n",
        "CUSTOM_TABLE_ID = f\"{PROJECT_ID}.{DATASET_ID}.custom_product_embeddings\"\n",
        "custom_schema = [\n",
        "    bigquery.SchemaField(\"id\", \"STRING\", mode=\"REQUIRED\"),\n",
        "    bigquery.SchemaField(\"embedding\", \"FLOAT64\", mode=\"REPEATED\"),\n",
        "    bigquery.SchemaField(\"content\", \"STRING\"),\n",
        "    bigquery.SchemaField(\"name\", \"STRING\"),\n",
        "    bigquery.SchemaField(\"category\", \"STRING\"),\n",
        "    bigquery.SchemaField(\"subcategory\", \"STRING\"),\n",
        "    bigquery.SchemaField(\"price\", \"FLOAT64\"),\n",
        "    bigquery.SchemaField(\"brand\", \"STRING\")\n",
        "]\n",
        "\n",
        "custom_table = bigquery.Table(CUSTOM_TABLE_ID, schema=custom_schema)\n",
        "try:\n",
        "    client.get_table(custom_table)\n",
        "    print(f\"Table {CUSTOM_TABLE_ID} already exists\")\n",
        "except Exception:\n",
        "    custom_table = client.create_table(custom_table)\n",
        "    print(f\"Created table {CUSTOM_TABLE_ID}\")"
      ],
      "metadata": {
        "id": "rQYxtUdpN8qk"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Define Pipeline components\n",
        "\n",
        "Our pipeline\n",
        "- Ingests product data as dictionaries\n",
        "- Converts product dictionaries to `Chunk`\n",
        "- Generates embeddings\n",
        "- Writes embeddings and metadata to a BigQuery with a custom schema"
      ],
      "metadata": {
        "id": "W9PYRkmn8D9n"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Convert product dictionary\n",
        "We define a function convert each ingested product dictionary to a Chunk to configure what text to embed and what to treat as metadata."
      ],
      "metadata": {
        "id": "6klWYJSC8fjT"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from typing import Dict, Any\n",
        "\n",
        "def create_chunk(product: Dict[str, Any]) -> Chunk:\n",
        "    \"\"\"Convert a product dictionary into a Chunk object.\n",
        "\n",
        "    Args:\n",
        "        product: Dictionary containing product information\n",
        "\n",
        "    Returns:\n",
        "        Chunk: A Chunk object ready for embedding\n",
        "    \"\"\"\n",
        "    # Combine name and description for embedding\n",
        "    text_to_embed = f\"{product['name']}: {product['description']}\"\n",
        "\n",
        "    return Chunk(\n",
        "        content=Content(text=text_to_embed),  # The text that will be embedded\n",
        "        id=product['id'],                     # Use product ID as chunk ID\n",
        "        metadata=product,                     # Store all product info in metadata\n",
        "    )"
      ],
      "metadata": {
        "id": "opbmdMDsQcjW"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Generate embeddings with HuggingFace\n",
        "We configure a local pre-trained Hugging Face model to create vector embeddings from the product descriptions."
      ],
      "metadata": {
        "id": "NqZQet2D88W3"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure the embedding model\n",
        "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
        "    model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
        ")"
      ],
      "metadata": {
        "id": "StbJsuoX9EMC"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Configure BigQuery Vector Writer\n",
        "To write embedded data to a BigQuery table with a custom schema we need to\n",
        "- Provide the BigQuery table schema\n",
        "- Define a function to convert the embedded `Chunk` to a dictionary that matches our BigQuery schema"
      ],
      "metadata": {
        "id": "l2xd8BD2OBCe"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Define BigQuery schema\n",
        "SCHEMA = {\n",
        "    'fields': [\n",
        "        {'name': 'id', 'type': 'STRING'},\n",
        "        {'name': 'embedding', 'type': 'FLOAT64', 'mode': 'REPEATED'},\n",
        "        {'name': 'content', 'type': 'STRING'},\n",
        "        {'name': 'name', 'type': 'STRING'},\n",
        "        {'name': 'category', 'type': 'STRING'},\n",
        "        {'name': 'subcategory', 'type': 'STRING'},\n",
        "        {'name': 'price', 'type': 'FLOAT64'},\n",
        "        {'name': 'brand', 'type': 'STRING'}\n",
        "    ]\n",
        "}\n",
        "\n",
        "# Define function to convert Chunk to dictionary with the custom schema\n",
        "def chunk_to_dict_custom(chunk: Chunk) -> Dict[str, Any]:\n",
        "    \"\"\"Convert a Chunk to a dictionary matching our custom schema.\"\"\"\n",
        "    # Extract metadata\n",
        "    metadata = chunk.metadata\n",
        "\n",
        "    # Map to custom schema\n",
        "    return {\n",
        "        'id': chunk.id,\n",
        "        'embedding': chunk.embedding.dense_embedding,\n",
        "        'content': chunk.content.text,\n",
        "        'name': metadata.get('name', ''),\n",
        "        'category': metadata.get('category', ''),\n",
        "        'subcategory': metadata.get('subcategory', ''),\n",
        "        'price': float(metadata.get('price', 0)),\n",
        "        'brand': metadata.get('brand', '')\n",
        "    }\n"
      ],
      "metadata": {
        "id": "1kECJe5xOAMh"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "Now we create a `BigQueryVectorWriterConfig` with a `SchemaConfig` parameter"
      ],
      "metadata": {
        "id": "qZLS9DiN-aRZ"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "custom_writer_config = BigQueryVectorWriterConfig(\n",
        "    write_config={\n",
        "        'table': CUSTOM_TABLE_ID,\n",
        "        'create_disposition': 'CREATE_IF_NEEDED',\n",
        "        'write_disposition': 'WRITE_TRUNCATE'  # Overwrite existing data\n",
        "    },\n",
        "    schema_config=SchemaConfig(\n",
        "        schema=SCHEMA,\n",
        "        chunk_to_dict_fn=chunk_to_dict_custom\n",
        "    )\n",
        ")"
      ],
      "metadata": {
        "id": "ZZJYh6CFQrAh"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Assemble and Run pipeline"
      ],
      "metadata": {
        "id": "2p2EgUaa-sGI"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "import tempfile\n",
        "\n",
        "options = pipeline_options.PipelineOptions([f\"--temp_location={TEMP_GCS_LOCATION}\"])\n",
        "\n",
        "\n",
        "# Run batch pipeline with custom schema\n",
        "with beam.Pipeline(options=options) as p:\n",
        "    _ = (\n",
        "        p\n",
        "        | 'Create Products' >> beam.Create(FILTERED_PRODUCTS_DATA)\n",
        "        | 'Convert to Chunks' >> beam.Map(create_chunk)\n",
        "        | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
        "          .with_transform(huggingface_embedder)\n",
        "        | 'Write to BigQuery' >> VectorDatabaseWriteTransform(custom_writer_config)\n",
        "    )"
      ],
      "metadata": {
        "id": "4XVt3kdkQzhB"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Verify Custom Schema Embeddings\n",
        "\n",
        "Let's check what was written to our custom schema table:"
      ],
      "metadata": {
        "id": "lHOK5zSjRZEp"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Query to verify the custom schema embeddings\n",
        "query = f\"\"\"\n",
        "SELECT\n",
        "  id,\n",
        "  name,\n",
        "  category,\n",
        "  subcategory,\n",
        "  price,\n",
        "  brand,\n",
        "FROM\n",
        "  `{CUSTOM_TABLE_ID}`\n",
        "ORDER BY category, subcategory, price\n",
        "LIMIT 5\n",
        "\"\"\"\n",
        "\n",
        "# Run the query\n",
        "query_job = client.query(query)\n",
        "results = query_job.result()\n",
        "\n",
        "# Display results\n",
        "print(\"First 5 Products in Custom Schema Table:\")\n",
        "print(\"-\" * 80)\n",
        "for row in results:\n",
        "    print(f\"ID: {row.id}\")\n",
        "    print(f\"Name: {row.name}\")\n",
        "    print(f\"Category: {row.category} > {row.subcategory}\")\n",
        "    print(f\"Price: ${row.price}\")\n",
        "    print(f\"Brand: {row.brand}\")\n",
        "    print(\"-\" * 80)"
      ],
      "metadata": {
        "id": "f1IA9mXNQ50C"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Advanced: Vector Search with Metadata Filter\n",
        "\n",
        "Prerequisites:\n",
        "- Advanced: Example with Custom Schema\n",
        "\n",
        "\n",
        "Now let's demonstrate how to perform vector search with filtering using our custom schema.\n",
        "\n",
        "Our pipeline:\n",
        "- Reads messages from PubSub that contains a `query` and `max_price` filter\n",
        "- Generates embeddings for the `query`\n",
        "- Performs vector search with additional `max_price` metadata filter\n",
        "\n",
        "## Sample Queries with Filter Requirements\n",
        "We define a list of messages to be published to PubSub. This is the data ingested to our pipeline."
      ],
      "metadata": {
        "id": "0OtXFuI-RfBg"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "FILTERED_QUERIES = [\n",
        "    {\"query\": \"I need a powerful laptop for video editing\", \"max_price\": 2000},\n",
        "    {\"query\": \"Looking for noise-cancelling headphones\", \"max_price\": 300},\n",
        "    {\"query\": \"What's a good ergonomic office chair?\", \"max_price\": 200},\n",
        "    {\"query\": \"I want an affordable coffee maker\", \"max_price\": 100},\n",
        "    {\"query\": \"Need a premium laptop with good specs\", \"max_price\": 1500}\n",
        "]"
      ],
      "metadata": {
        "id": "NELWZcNDRccK"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Create PubSub Topic\n",
        "We create a PubSub topic to be used as our pipeline data source"
      ],
      "metadata": {
        "id": "2Uz0JreARtB7"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Define pubsub topic for filtered queries\n",
        "TOPIC = \"\" # @param {type:'string'}\n",
        "\n",
        "topic_path = create_pubsub_topic(PROJECT_ID, TOPIC)\n"
      ],
      "metadata": {
        "id": "CcBvTEHCRpR6"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Define Pipeline components"
      ],
      "metadata": {
        "id": "0YHC214OJXB7"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Process PubSub messages"
      ],
      "metadata": {
        "id": "M76zgDpJJs11"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "def process(message):\n",
        "    \"\"\"Convert a filtered query message to a Chunk for embedding and search.\"\"\"\n",
        "    message_data = json.loads(message.decode('utf-8'))\n",
        "    return Chunk(\n",
        "        content=Content(text=message_data['query']),\n",
        "        metadata={\n",
        "            \"max_price\": message_data['max_price']\n",
        "        }\n",
        "    )"
      ],
      "metadata": {
        "id": "TDxM-JjWJw9H"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Configure embedding model"
      ],
      "metadata": {
        "id": "o1bpd6eLJlKz"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure the embedding model\n",
        "huggingface_embedder = HuggingfaceTextEmbeddings(\n",
        "    model_name=\"sentence-transformers/all-MiniLM-L6-v2\"\n",
        ")"
      ],
      "metadata": {
        "id": "lOT6_2RKJmlN"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Configure Vector Search with Metadata Filter\n",
        "Vector search will return the two most semantically similar product with an upper price limit of `max_price`"
      ],
      "metadata": {
        "id": "rZls_WhtSEAh"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "# Configure vector search parameters with metadata_restriction_template\n",
        "vector_search_params = BigQueryVectorSearchParameters(\n",
        "    project=PROJECT_ID,\n",
        "    table_name=CUSTOM_TABLE_ID,\n",
        "    embedding_column=\"embedding\",\n",
        "    columns=[\"id\", \"name\", \"category\", \"subcategory\", \"price\", \"brand\", \"content\"],\n",
        "    neighbor_count=1,\n",
        "    metadata_restriction_template=\"price <= {max_price}\"\n",
        ")\n",
        "\n",
        "# Create search handler\n",
        "search_handler = BigQueryVectorSearchEnrichmentHandler(\n",
        "    vector_search_parameters=vector_search_params,\n",
        "    min_batch_size=1,\n",
        "    max_batch_size=5\n",
        ")"
      ],
      "metadata": {
        "id": "lzikN9-TSBWS"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "### Log the enriched query"
      ],
      "metadata": {
        "id": "hvI3ib8iLRxD"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "def format_filtered_results(chunk):\n",
        "    \"\"\"Format filtered search results for display.\"\"\"\n",
        "    # Extract results from enrichment_data\n",
        "    results = chunk.metadata.get(\"enrichment_data\", {}).get(\"chunks\", [])\n",
        "    max_price = chunk.metadata.get(\"max_price\")\n",
        "\n",
        "    # Log the query\n",
        "    print(f\"\\n=== PRICE-FILTERED QUERY ===\")\n",
        "    print(f\"Query: \\\"{chunk.content.text}\\\"\")\n",
        "    print(f\"Max Price: ${max_price}\")\n",
        "\n",
        "    # Log the results\n",
        "    print(f\"\\nFound {len(results)} matching products under ${max_price}:\")\n",
        "\n",
        "    if results:\n",
        "        for i, result in enumerate(results, 1):\n",
        "            # Print product details\n",
        "            print(f\"\\nResult {i}:\")\n",
        "            print(f\"  Product: {result.get('name', 'Unknown')}\")\n",
        "            print(f\"  Category: {result.get('category', 'Unknown')} > {result.get('subcategory', 'Unknown')}\")\n",
        "            print(f\"  Price: ${result.get('price', 'Unknown')}\")\n",
        "            print(f\"  Brand: {result.get('brand', 'Unknown')}\")\n",
        "            print(f\"  Description: {result.get('content', 'Unknown')}\")\n",
        "            print(f\"  Similarity distance: {result.get('distance', 'Unknown')}\")\n",
        "\n",
        "            # Verify price is under max\n",
        "            price = float(result.get('price', 0))\n",
        "            print(f\"  Price Check: {'✓' if price <= max_price else '✗'}\")\n",
        "    else:\n",
        "        print(\"  No matching products found.\")\n",
        "\n",
        "    print(\"=\" * 80)\n",
        "\n",
        "    return chunk"
      ],
      "metadata": {
        "id": "ohrj6qJ1YtCI"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "## Run Vector Search with metadata filter Pipeline"
      ],
      "metadata": {
        "id": "AitR4hIJ_TCs"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "import tempfile\n",
        "from apache_beam.transforms import trigger\n",
        "\n",
        "print(\"Starting publisher thread...\")\n",
        "publisher_thread = threading.Thread(\n",
        "    target=publisher_function,\n",
        "    args=(PROJECT_ID, TOPIC, FILTERED_QUERIES),\n",
        "    daemon=True\n",
        ")\n",
        "publisher_thread.start()\n",
        "print(f\"Publisher thread started with ID: {publisher_thread.ident}\")\n",
        "\n",
        "options = pipeline_options.PipelineOptions()\n",
        "options.view_as(pipeline_options.StandardOptions).streaming = True\n",
        "# Run the streaming pipeline with price filtering\n",
        "with beam.Pipeline(options=options) as p:\n",
        "    results = (\n",
        "        p\n",
        "        | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=topic_path)\n",
        "        | 'Process Messages' >> beam.Map(process)\n",
        "        | 'Window' >> beam.WindowInto(beam.window.GlobalWindows(),\n",
        "                             trigger=trigger.Repeatedly(\n",
        "                                trigger.AfterProcessingTime(\n",
        "                                    30)),\n",
        "                             accumulation_mode=trigger.AccumulationMode\\\n",
        "                                 .DISCARDING)\n",
        "        | 'Generate Embeddings' >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
        "          .with_transform(huggingface_embedder)\n",
        "        | 'Price-Filtered Vector Search' >> Enrichment(search_handler)\n",
        "        | 'Format Filtered Results' >> beam.Map(format_filtered_results)\n",
        "    )"
      ],
      "metadata": {
        "id": "yB_3nOwkYvA5"
      },
      "execution_count": null,
      "outputs": [{"output_type": "stream", "name": "stdout", "text": ["\n"]}]
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Whats next?\n",
        "- Check out the [Apache Beam RAG package documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.html)\n",
        "- [Configure BigQueryVectorSearchParameters](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.enrichment.bigquery_vector_search.html#apache_beam.ml.rag.enrichment.bigquery_vector_search.BigQueryVectorSearchParameters.options) further by specifying `options` or `distance_type`, as specified by [BigQuery VECTOR_SEARCH documentation](https://cloud.google.com/bigquery/docs/reference/standard-sql/search_functions#vector_search)\n",
        "- Split large documents into smaller `Chunk's` with [LangChain](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.chunking.langchain.html)"
      ],
      "metadata": {
        "id": "yC9hcNj2ARxL"
      }
    }
  ]
}
